package cn.edu.flink.tutorial.window;


import cn.edu.flink.tutorial.source.UDSourceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class SessionWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<Long, Long>> streamSource = env.addSource(new UDSourceFunction()).map(x -> Tuple2.of(x, 1L)).returns(Types.TUPLE(Types.LONG, Types.LONG));

        streamSource
                .windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(1)))
                .reduce(new ReduceFunction<Tuple2<Long, Long>>() {
                    @Override
                    public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2) throws Exception {
                        return Tuple2.of(value1.f0, value2.f0);
                    }
                })
                .print();

        env.execute();
    }
}
